C++异步:structured concurrency实现解析!
The following article is from 腾讯云开发者 Author 沈芳
导语| 本篇我们将介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。
前篇《C++异步:libunifex中的concepts详解!》中我们介绍了libunifex作为框架部分的concept设计,本篇我们将在这个基础上,继续介绍整个libunifex structure concurrency的实现思路,并结合一部分具体的cpo实现,对这部分的实现做深入分析。
一、Structured Concurrency
我们以一个简短的示例代码开启本章的内容:
single_thread_context tcontext;
int count = 0;
schedule(tcontext.get_scheduler())
| then([&] { ++count; })
| sync_wait();
这段代码的表达方式前面我们也介绍过,主要使用了ranges类同的pipeline表达,我们可以简单将这种表达方式看成是C++的一种特殊LINQ实现,一个专有的DSL,当然,作为一个DSL来说,就execution的整体设计而言,它被赋予了一些专有的特性和意义:所以,如果从一个DSL的角度来看execution的结构化 concurrency,我们容易得出类似以下的观点,对于execution的pipeline表达:
DSL定义(BNF组成)-首先是范式的组成,如上图所示,业务使用结构化并发表达的时候,整个范式是由Concurrency Pipeline::= Sender Factory { '|' Sender Adapter } '|' Receiver组成的。
Compiler-通常情况我们可以将|操作以及connect()加起来成是编译过程, 借由Compiler Time的特性支持, 我们可以通过connect()产生runtime所需的OperationState。
Execute-这个阶段就很自然了,OperationState的start()就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个receiver cpos来传递的。
本篇中我们将以这种思路结合一些Sender Factory,Send Adapter节点,以及这种结果处理节点的具体实现来展开.sync_wait()。
二、Sender Factory
各种Sender Factory cpos用于产生各类sender,前面我们也介绍过, sender最大的特征就是会触发set_value,set_error,set_done这几个用于结束通知的receiver cpos。此处我们以just() 为例,来看一下一个Sender Factory需要包含的实现内容,在后续文章中我们会再介绍另外一个schedule() cpo的实现。
(一)just实现解析
首先just(values...)的语义, 就是生成一个sender, 该sender可以向后续的节点通过receiver cpos传递values..., 我们具体来看一下libunifex的just()实现, 会比大家想的复杂一些, 这主要还是因为execution实现的整体思路就是在尝试定义一个DSL, 然后这个DSL本身是自恰的, 比如对于just()来说, 必然会包含以下几个部分:
sender生产方法-just() cpo本身。
just::sender的实现-具体的sender实现。
相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。
我们来分别看一下这三部分的具体代码实现:
sender生产方法
constexpr auto just_cpo::operator()(Values&&... values) const {
return _just::sender<Values...>{std::in_place, (Values&&)values...};
}
just()的入口定义比较简单,主要是根据输入的变参values...构造一个_just::sender<Values...>{}对象并返回。这就是我们下一节要介绍的sender实现。
bind_back()版的operator()重载主要用于pipeline组织,代码大量雷同,本篇将统一忽略,方便源码的阅读,有兴趣的读者可以自行翻阅相关的实现。
just::sender的实现
_just::sender<> 其实真实类型是 _just::_sender<>::type,这个只是libunifex惯用的一种封装方式,具体的实现如下:
template <typename... Values>
class just::sender<Values...> {
private:
std::tuple<Values...> values_;
public:
template </*...*/>
using value_types = Variant<Tuple<Values...>>;
template </*...*/>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = false;
template<typename... Values2>
explicit sender(std::in_place_t, Values2&&... values)
: values_((Values2 &&) values...) {}
template<typename This, typename Receiver>
friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)
-> operation<Receiver, Values...> {
return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)};
}
};
这是一个很标准的sender实现,如我们在《C++异步:libunifex中的concepts详解!》中介绍的一样。首先是sender traits需要的类型定义部分,决定了sender可能触发的receiver cpos的参数和类型:
template </*...*/>
using value_types = Variant<Tuple<Values...>>;
template </*...*/>
using error_types = Variant<std::exception_ptr>;
static constexpr bool sends_done = false;
其次是通过tag_invoke定义的connect()实现:
template<typename This, typename Receiver>
friend auto tag_invoke(tag_t<connect>, This&& that, Receiver&& r)
-> operation<Receiver, Values...> {
return {static_cast<This&&>(that).values_, static_cast<Receiver&&>(r)};
}
此处返回的operation<>也是我们下一节要介绍的just()专用的OperationState实现。
相关的OperationState
template <typename Receiver, typename... Values>
struct just::operation<Receiver, Values...>::type {
std::tuple<Values...> values_;
Receiver receiver_;
void start() & noexcept {
try {
std::apply(
[&](Values&&... values) {
execution::set_value((Receiver &&) receiver_, (Values &&) values...);
},
std::move(values_));
} catch (...) {
execution::set_error((Receiver &&) receiver_, std::current_exception());
}
}
};
抛开绕来绕去的alias name来说, 这个OperationState的实现很简单, 存储了传入的values...和connect()时关联的Receiver, 并且在start()时向存储的Receiver调用set_value()传递存储下来的values...
(二)本章小结
对于一个sender factory类型的cpo来说,我们始终可以将其实现简单的分成以下几部分:
sender生产方法-如just()。
sender的实现-具体的sender实现。
相关的OperationState-节点可参与异步操作执行,则必然可以通过connect()来产生其OperationState对象,最后对start()作出响应。因为用于产生一个sender,这类节点一般都出现在structured concurrency描述的最左侧,负责作为后续节点的数据来源,如最开始的示例代码中那样。
三、Sender Adapter
首先我们知道Sender Adapter是作为中间节点存在的:
Concurrency Pipeline ::= Sender Factory { '|' Sender Adapter } '|' Receiver
我们先来看一下Sender Adapter语义层面的特征:
Sender Adapter是Sender的包装器,接收前置Sender对象后形成新的Sender对象。
新的Sender对象有自己的异步类型定义,同样也通过receiver cpos向后续节点传递异步操作结果。
Sender Adapter其实就像一个filter,它对原始的异步处理结果进行加工,产生新的结果,大致的工作情况如下图所示:
如上图所示,对于一个Sender Adapter定义,至少会包含两个对象:
Internal Receiver-用于接收Previous Sender发送的结果,处理自己的逻辑后再将结果发往后续节点。
Internal Sender-SenderAdapter(Sender,args...)形成一个新的Sender,连接到后续节点。当然,还会有一个用于作为入口的cpo。
我们具体以比较常用的then()的实现来具体看一下libunifex中一个典型的Sender Adapter是如何实现的:
(一)then() cpo
then()节点的作用是从上一个节点中获取异步返回值后,用该返回值作为输入值调用传入then()节点的函数后,将调用结果作为异步操作的结果返回后续节点,简单的图示如下:
对应的代码实现为:
template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
-> _result_t<Sender, Func> {
return execution::tag_invoke(_fn{}, (Sender&&)predecessor, (Func&&)func);
}
template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
-> _result_t<Sender, Func> {
return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};
}
};
then()调用的处理区分了传入的Func是否可tag_invoke的判断,我们直接看最通常的情况,传入的是普通函数:
template<typename Sender, typename Func>
auto then_cpo::operator()(Sender&& predecessor, Func&& func) const
-> _result_t<Sender, Func> {
return _then::sender<Sender, Func>{(Sender &&) predecessor, (Func &&) func};
}
最后返回的是一个_then命名空间下定义的_then::sender<>对象,并且这个对象将前置的Sender对象和传入的func作为构造这个对象的参数。我们来看一下这个sender的具体实现:
(二)then()的Internal Sender实现
template <typename Predecessor, typename Func>
struct then::sender<Predecessor, Func>::type {
Predecessor pred_;
Func func_;
private:
template <typename... Args>
using result = /*unspecified*/;
public:
template </*unspecified*/>
using value_types = /*unspecified*/;
template </*unspecified*/>
using error_types = /*unspecified*/;
static constexpr bool sends_done = sender_traits<Predecessor>::sends_done;
template <typename Receiver>
using receiver_t = receiver_t<Receiver, Func>;
template<typename Sender, typename Receiver>
friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r)
-> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> {
return execution::connect(
static_cast<Sender&&>(s).pred_,
receiver_t<remove_cvref_t<Receiver>>{
static_cast<Sender&&>(s).func_,
static_cast<Receiver&&>(r)});
}
};
跟我们前面看到的just()内的sender实现一样,包含了基本的sender types定义,以及sender相关的connect()tag_invoke定义:
template<typename Sender, typename Receiver>
friend auto tag_invoke(tag_t<execution::connect>, Sender&& s, Receiver&& r)
-> connect_result_t<member_t<Sender, Predecessor>, receiver_t<remove_cvref_t<Receiver>>> {
return execution::connect(
static_cast<Sender&&>(s).pred_,
receiver_t<remove_cvref_t<Receiver>>{
static_cast<Sender&&>(s).func_,
static_cast<Receiver&&>(r)});
}
我们可以看到,对then()的sender进行connect()的时候,真正发生connect()的是我们之前在then(Previous Sender,Func)调用时缓存下来的上一节点,以及新构建出的receiver_t<>对象,这个对象也是Func真正被执行的地方,同时这个对象也保存了后续的Reciver节点,方便向后续节点传递异步执行结果。
(三)then()的Internal Receiver实现
template <typename Receiver, typename Func>
struct then::receiver_t<Receiver, Func>::type {
Func func_;
Receiver receiver_;
template <typename... Values>
void set_value(Values&&... values) && noexcept {
using result_type = std::invoke_result_t<Func, Values...>;
if constexpr (std::is_void_v<result_type>) {
if constexpr (noexcept(std::invoke(
(Func &&) func_, (Values &&) values...))) {
std::invoke((Func &&) func_, (Values &&) values...);
execution::set_value((Receiver &&) receiver_);
} else {
try {
std::invoke((Func &&) func_, (Values &&) values...);
execution::set_value((Receiver &&) receiver_);
} catch (...) {
execution::set_error((Receiver &&) receiver_, std::current_exception());
}
}
} else {
if constexpr (noexcept(std::invoke(
(Func &&) func_, (Values &&) values...))) {
execution::set_value(
(Receiver &&) receiver_,
std::invoke((Func &&) func_, (Values &&) values...));
} else {
try {
execution::set_value(
(Receiver &&) receiver_,
std::invoke((Func &&) func_, (Values &&) values...));
} catch (...) {
execution::set_error((Receiver &&) receiver_, std::current_exception());
}
}
}
}
template <typename Error>
void set_error(Error&& error) && noexcept {
execution::set_error((Receiver &&) receiver_, (Error &&) error);
}
void set_done() && noexcept {
execution::set_done((Receiver &&) receiver_);
}
};
到receiver的实现这里就很自然了,通过set_value()接受前面的Sender传递过来的结果,将结果作为输入参数调用Func后,再通过set_value()向后续节点传递Func的返回值。
(四)本章小结
对于一个Sender Adapater类型的cpo来说,主要需要完成以下几件事情:
入口cpo(如then())-完成对前置Sender的接收和需要的参数的接收处理,创建一个专用的Internal Sender对象并返回。
Internal Sender-存储前置Sender和需要的参数,并实现tag_invoke(tag_t<execution::connect>)用于构建InternalReceiver,并将实际的connect()操作重定向到保存下来的前置Sender和新创建的InternalReceiver上。
InternalReceiver-获取前置Sender的异步结果,并在处理自身逻辑后,将最终的结果返回给后续节点。整体上来说可以将这看成一种wrapper机制, set_value是拦截点,在拦截点上插入自身逻辑,最后依然还是通过set_value返回下一步需要的异步执行结果。
四、sync_wait_r()与sync_wait()
libunifex的实现并没有提供一个类似default receiver的节点,但提供了工具节点sync_wait_r()和sync_wait(),当然,除了通过这种方式来处理返回结果,你也可以自行实现一个自己的Receiver来接收异步返回值。本章我们主要介绍sync_wait_r()和sync_wait()的实现,通过这两者,我们也能更深入理解libunifex常规状态下是如何发起一个异步操作执行并接收其返回结果的。
(一)cpo入口
sync_wait():
template<typename Sender>
auto sync_wait_cpo::operator()(Sender&& sender) const
-> std::optional</*...*/> {
using Result = /*...*/;
return _sync_wait::_impl<Result>((Sender&&) sender);
}
sync_wait_r():
template <typename Result>
decltype(auto) sync_wait_r_cpo::operator()(Sender&& sender) const {
using Result2 = non_void_t<wrap_reference_t<decay_rvalue_t<Result>>>;
return _sync_wait::_impl<Result2>((Sender&&) sender);
}
两者代码高度相似:
输入参数都是Sender。
利用_sync_wait::_impl<>来完成具体的实现。
两者的差异:
sync_wait_r<Result>允许业务侧指定返回值的类型,不支持pipeline操作,一般直接以sync_wait_r<Result>(Sender)的方式来使用。
sync_wait 直接使用传入的Sender来推导返回值类型,可以作为pipeline的终结节点使用,如just(1)|sync_wait()。
我们接下来看一看sync_wait和sync_wait_r都引用的_sync_wait::_impl的实现:
(二)sync_wait::_impl的实现
auto _impl(Sender&& sender) {
manual_event_loop ctx;
// Store state for the operation on the stack.
auto operation = connect(
(Sender&&)sender,
_sync_wait::receiver_t<Result>{promise, ctx});
start(operation);
ctx.run();
// ... (retsult handling here)
}
整体实现比较简洁,我们主要关注几点:
_impl()最终的返回值类型为std::optional<Result>。
整个函数的实现完成了前面的们提到的connect()产生OperationState,再执行start()的过程。
connect()时与传入的Sender进行连接的Receiver是自定义的_sync_wait::_receiver<T>::type类型。
ctx.run()等待最终执行的完成(相关详细分析可参考后续文章)。
根据promise.state_记录的类型对返回值进行处理(正确返回值还是抛异常)。
剩下的就只有_sync_wait::receiver_t<>的实现了,我们接着来看一下这部分的实现:
(三)_sync_wait::receiver_t<>的实现
template <typename T>
struct sync_wait::receiver_t {
promise<T>& promise_;
manual_event_loop& ctx_;
template <typename... Values>
void set_value(Values&&... values) && noexcept {
try {
execution::activate_union_member(promise_.value_, (Values&&)values...);
promise_.state_ = promise<T>::state::value;
} catch (...) {
execution::activate_union_member(promise_.exception_, std::current_exception());
promise_.state_ = promise<T>::state::error;
}
signal_complete();
}
void set_error(std::exception_ptr err) && noexcept {
execution::activate_union_member(promise_.exception_, std::move(err));
promise_.state_ = promise<T>::state::error;
signal_complete();
}
void set_error(std::error_code ec) && noexcept {
std::move(*this).set_error(make_exception_ptr(std::system_error{ec, "sync_wait"}));
}
template <typename Error>
void set_error(Error&& e) && noexcept {
std::move(*this).set_error(make_exception_ptr((Error&&)e));
}
void set_done() && noexcept {
promise_.state_ = promise<T>::state::done;
signal_complete();
}
private:
void signal_complete() noexcept {
ctx_.stop();
}
};
这就是一个很标准的receiver实现,利用set_value,set_error,set_done的重载来完成对前置Sender执行结果的获取,通过前面的代码我们容易知道,如果是无异常的状态,则正常的通过std::optional<>来返回执行结果,否则抛出异常。另外,代码中的signal_complete()用于通知_impl函数中的 ctx.run()返回,最终向用户返回异步操作的结果。
五、总结
本篇我们从libunifex的structured concurrency设计开始,简述了整套execution整套DSL的组织和执行的逻辑,并结合具体的:
Sender Factory实现举例-just()。
Sender Adapter实现举例-then()。
终结节点-sync_wait()和sync_wait_r()加深大家对execution各类节点实现的理解。
structured concurrency的设计是整个库的核心,理解了它,也能方便我们理解一些基础节点的实现,也为自己定制更多业务化的节点提供良好的基础。这也是为什么execution库也被当成一个库作者向的特性的原因,与其说它是一个异步库,不如说它在尝试定义一套从DSL到执行态都比较完备的c++异步专用语言。当然,后者的学习成本比学习一个库明显会高出比较多。
参考资料:
1.libunifex源码库
作者简介
沈芳
腾讯后台开发工程师
IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。
参考阅读:
本文由高可用架构转载。技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。
活动预告
↓↓↓
GIAC 全球互联网架构大会 2022 将于 7 月 22 - 23 日在深圳举行,本届 GIAC 议题共设置有 24 个专题,覆盖各类架构热点领域,每个主题由业内知名架构师、技术负责人等专家担任出品人,负责议题选取和质量把控。本次大会包括数据智能平台演进(由白海科技创始人兼CEO卢亿雷担任出品人)和湖仓一体(由Shopee Data Infra Team罗李担任出品人)等专题,将有更多本文相关内容演讲,点击阅读原文查看 GIAC 详细日程。
点击【阅读原文】,了解更多活动信息。